【S3からS3へ】Node.js の Streaming API を使って Lambda Function のみで CSVファイルを JSON Lines ファイルへ変換する

【S3からS3へ】Node.js の Streaming API を使って Lambda Function のみで CSVファイルを JSON Lines ファイルへ変換する

Lambda Function でもETLしたい
Clock Icon2020.05.16

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

サーバーレスアプリケーションで、S3 にあるファイルを変換したいケースでは、AWS Glue を使った ETL が思い浮かびます。もちろんそれでもよいのですが、下準備のたいへんさや変換にかかる時間を考えると、何かとおおがかりな印象です。料金も手放しで安い!と言い切れるほどではありません。かといって、Lambda Function で S3 GetObject を使い一度メモリにすべて読み込んで処理するのは筋が悪そうです。

そこで Node.jsの Streaming API を使い、メモリを圧迫せず、かつ Lambda Function のみで処理する手段を試してみます。

CSVファイルを JSON Lines へ変換する

人間にとって見やすいのは表形式との相性がよいCSVですが、ソフトウェアで扱いやすいのはデータ型を表現できキー名もつけられるJSONです。よってこの2形式の相互変換は、多くのシステムで機能の一部として実装されることでしょう。今回はクラスメソッドで架空の大規模アンケートが実施されたことを想定し、その結果CSVファイルを JSON Lines へ変換する処理を考えてみます。次のような変換です。

回答ID,a1,a2,a3,a4,a5,a6,a7,a8,回答日時
classmethod_answer_0,2,9,1,7,5,5,4,Kesase dunwu it nisfen iwo vogal ako po giser hosjobik.,2020-05-15T14:01:12.299+09:00
classmethod_answer_1,5,9,4,8,9,1,9,Zecad hubo urrabiw ecsi rob futkic zesi fo fono imki.,2020-05-15T14:01:12.306+09:00

このCSVファイルを次の JSON Lines に変換します。

{"answerId":"classmethod_answer_0","answerAt":"2020-05-15T14:01:12.299+09:00","answerDataJsonString":"{\"a1\":\"2\",\"a2\":\"9\",\"a3\":\"1\",\"a4\":\"7\",\"a5\":\"5\",\"a6\":\"5\",\"a7\":\"4\",\"a8\":\"Kesase dunwu it nisfen iwo vogal ako po giser hosjobik.\"}"}
{"answerId":"classmethod_answer_1","answerAt":"2020-05-15T14:01:12.306+09:00","answerDataJsonString":"{\"a1\":\"5\",\"a2\":\"9\",\"a3\":\"4\",\"a4\":\"8\",\"a5\":\"9\",\"a6\":\"1\",\"a7\":\"9\",\"a8\":\"Zecad hubo urrabiw ecsi rob futkic zesi fo fono imki.\"}"}

CSVファイルがS3に保存されている前提で、変換後の JSON Lines ファイルもS3にアップロードすること を考えていきます。

images/s3csvtos3json.png

次の処理が適用されています。

  1. CSVの1レコードは、JSON一行として JSON Lines に変換される
  2. CSVヘッダはJSONのキーに対応付けるが、名前は変える
  3. 回答内容はひとつのJSONキーに文字列としてまとめてしまう

使うもの

Node.js Streaming API

あらゆるデータの流れを Streaming オブジェクトとして抽象的に扱え、ストリームに対して専用のAPIを利用できるようにする標準モジュールです。ファイルや標準出力、HTTPサーバーなど、さまざまなデータがストリームとして提供されているので、メモリを浪費せずに連続したデータを扱うことができます。S3で扱う方法は後述します。

fast-csv

npmモジュールです。スター数など発展途上ですが、 Streaming APIでCSVファイルを扱いたいならこれでいいのでは というくらい便利で機能が豊富です。

Lambda Function

今回の実行環境。AWSサービスとして次のような制約があります。

  • メモリサイズは 128MBから3008MB
  • 実行時間は最大 15分

その代わりサーバーをたている必要もなく、たてたサーバーを消す必要もない(これ大事)イカすやつです。

環境とバージョン

パッケージ名 バージョン 用途
aws-cdk 1.38.0 Lambda Function と S3 をAWSへデプロイするため
fast-csv 4.1.4 Streaming API で CSV をパースするため
aws-sdk 2.677.0 S3のダウンロード、S3へのアップロードで利用する

CSV to JSON Lines

fast-json のドキュメントを見ながら実装します。transform をうまく使えば実装できそうです。

const { EOL } = require('os');
const { parse } = require('fast-csv');
const stream = require('stream')

// 入力CSVをまずはテキストで用意します
const CSV_STRING = [
  '回答ID,a1,a2,a3,a4,a5,a6,a7,a8,回答日時',
  'classmethod_answer_0,2,9,1,7,5,5,4,Kesase dunwu it nisfen iwo vogal ako po giser hosjobik.,2020-05-15T14:01:12.299+09:00',
  'classmethod_answer_1,5,9,4,8,9,1,9,Zecad hubo urrabiw ecsi rob futkic zesi fo fono imki.,2020-05-15T14:01:12.306+09:00',
].join(EOL);

// テキストから ReadStreamを作ります
stream.Readable.from(CSV_STRING)
  .pipe(parse({ headers: true }))

  // ここで CSV の1レコードを JSON Lines の1レコードに変換しています
  .transform(row => {
    const jsonLines = {
      answerId: row['回答ID'],
      answerAt: row['回答日時'],
      answerDataJsonString: JSON.stringify(
        Object.entries(row)
          .filter(([key, value], _) => key.includes('a'))
          .reduce((obj, [key, value]) => {
            obj[key] = value;
            return obj;
          }, {}),
      ),
    };
    return JSON.stringify(jsonLines) + '\n';
  })

  // 標準出力の WriteStream へ pipe します
  .pipe(process.stdout)
  .on('error', error => console.error(error))

出力結果は次のようになります。

$ node converter.js

{"answerId":"classmethod_answer_0","answerAt":"2020-05-15T14:01:12.299+09:00","answerDataJsonString":"{\"a1\":\"2\",\"a2\":\"9\",\"a3\":\"1\",\"a4\":\"7\",\"a5\":\"5\",\"a6\":\"5\",\"a7\":\"4\",\"a8\":\"Kesase dunwu it nisfen iwo vogal ako po giser hosjobik.\"}"}
{"answerId":"classmethod_answer_1","answerAt":"2020-05-15T14:01:12.306+09:00","answerDataJsonString":"{\"a1\":\"5\",\"a2\":\"9\",\"a3\":\"4\",\"a4\":\"8\",\"a5\":\"9\",\"a6\":\"1\",\"a7\":\"9\",\"a8\":\"Zecad hubo urrabiw ecsi rob futkic zesi fo fono imki.\"}"}

fast-csv を使えば transform にて CSV の1レコードを分を扱うことができるため、ここで JSON に変換してやればよいです。要件にあげていた、

  • CSVヘッダはJSONのキーに対応付けるが、名前は変える
  • 回答内容はひとつのJSONキーに文字列としてまとめてしまう

これらもここでやっています。変換部分については、これで問題なさそうです。

S3 の入出力を Streaming で

先の例では ReadStream は文字列、WriteStream は標準出力でした。理屈の上では、これらをどちらもS3に置き換えることができればそのまま変換処理が使えることになります。

S3 の ReadStream

実はこれは簡単です。aws-sdk JavaScript では、 s3.getObject(...).createReadStream() が用意されています。

コード例を引用します。

var fileStream = fs.createWriteStream('/path/to/file.jpg');
var s3Stream = s3.getObject({Bucket: 'myBucket', Key: 'myImageFile.jpg'}).createReadStream();

// Listen for errors returned by the service
s3Stream.on('error', function(err) {
    // NoSuchKey: The specified key does not exist
    console.error(err);
});

s3Stream.pipe(fileStream).on('error', function(err) {
    // capture any errors that occur when writing data to the file
    console.error('File Stream:', err);
}).on('close', function() {
    console.log('Done.');
});

S3上にあるCSVファイルに対しても、同じように利用できそうです。

S3 の WriteStream

問題はこちらです。S3では、s3.put(...)以外に、よりレイヤの低いs3.upload(...)が用意されています。

Uploads an arbitrarily sized buffer, blob, or stream , using intelligent concurrent handling of parts if the payload is large enough. You can configure the concurrent queue size by setting options. Note that this is the only operation for which the SDK can retry requests with stream bodies.

Stream を Body に指定可能と書いています。ですので、createReadStreamでとってきた ReadStream を アップロード側のBodyに渡してやれば良さそう…なのですがちょっと困ったことになりました。createReadStreamでとってきた ReadStream には、transform で JSON Lines に変換する処理をかましたいのでした。よって、upload の Body に指定可能な ReadStream としての役割もこなしつつ、変換された JSON Lines の投入も受け付ける中間 Stream が必要です。Node.js の Streaming API にはこの役割に当てはまる PassThrough が用意されています。

PassThrough を使ってアップロードのための WriteStream を作るには、次のようにします。

const stream = require('stream')

function uploadStream() {
  const pass = new stream.PassThrough() // PassThrough を作ります
  return {
    writeStream: pass, // ここに変換後の JSON Lines を投入していきます
    promise: s3.upload({
      Bucket: 'answer-bucket',
      Key: 'answer_converted.json',
      Body: pass    // PassThrough はアップロードデータにもなります
    }).promise()
  }
}

const {writeStream, promise} = uploadStream()

Lambda Function では実行中であっても Function の終わりに到達してしまうとコンテナが破棄されるため、アップロード処理の Promise も返すようにし、これを await することでアップロードの終わりまで待機させるようにします。準備が整いました。

  • S3 CSV の ReadStream
  • CSVからJSON Linesに変換する fast-csv を利用した transform
  • S3 JSON Lines の WriteStream

すべて手に入ったのでつなげます。

Lambda Function のコード

ここで TypeScript にします。

import { LambdaContext } from '../../lambda-context';
import * as stream from 'stream';
import * as aws from 'aws-sdk';
import * as csv from 'fast-csv';

const s3 = new aws.S3({
    region: 'ap-northeast-1',
});

// CSVからJSON Linesに変換する fast-csv を利用した `transform`
function convertRow(row: any): string {
    const jsonLines = {
        answerId: row['回答ID'],
        answerAt: row['回答日時'],
        answerDataJsonString: JSON.stringify(
            Object.entries(row)
                .filter(([key, value], _) => key.includes('a'))
                .reduce((obj: any, [key, value]) => {
                    obj[key] = value;
                    return obj;
                }, {}),
        ),
    };
    return JSON.stringify(jsonLines) + '\n';
}

// これを handler として呼び出す
export async function streaming(event: any, context?: any): Promise<void> {
    console.time('stream');

    // S3 JSON Lines の WriteStream
    const uploadStream = (): any => {
        const pass = new stream.PassThrough();
        return {
            writeStream: pass,
            promise: s3
                .upload({
                    Bucket: process.env.BUCKET_NAME!,
                    Key: 'converted_streaming.json',
                    Body: pass,
                })
                .promise(),
        };
    };
    const { writeStream, promise } = uploadStream();

    // S3 CSV の ReadStream
    const readStream = await s3
        .getObject({
            Bucket: process.env.BUCKET_NAME!,
            Key: 'generated.csv',
        })
        .createReadStream();


    // すべてをつなげる
    readStream
        .pipe(csv.parse({ headers: true }))
        .transform(convertRow)
        .pipe(writeStream);

    // アップロード完了まで待つ
    await promise;
    console.timeEnd('stream');
}

メモリを圧迫せずにCSVをJSONへ変換するコードの完成です。128MBメモリの Lambda Function で大きなファイルサイズの変換もこのとおり。

large-file-streaming.png

次に訪れるボトルネックは Lambda Function の 実行時間制限 "15分"

ここまでなんとなく想像がついた方もいらっしゃるかもしれませんが、メモリ利用量が問題でなくなったとして、次にボトルネックになるのは実行時間です。15分という制限時間の中で、実際どのくらいのファイルサイズを扱えるのか、気になるところです。Lambda Function で実行している処理内容に大きく依存する、というのは間違いないのですが、今回の実装で参考値として出しておく意味はあります。

これについては別の記事で試してみます。

まとめ

Node.js の Streaming API と、aws-sdk の S3 オブジェクトをうまく使うことで、ストリーミング処理による S3 to S3 の変換処理が実現できました。「だいたいこれくらいのファイルサイズまでならいけますよ」という合意がとれるのであれば、Lambda Function のみで完結することは十分アリだと考えます。

また、ストリーミング処理における fast-csv をうまく使うことで、簡単に CSV を JSON Liness に変換できることも示しました。ぜひいろいろなCSVファイルのパターンで使ってみてください。

Share this article

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.